Skip to main content

Health Checks and Readiness

What You Will Learn

  • The critical difference between liveness and readiness probes, and the consequences of getting them wrong
  • How to design liveness checks that detect real application failure without causing cascading restarts
  • How to build readiness endpoints that accurately reflect service health using parallel dependency checks
  • How to use the py-healthcheck library for structured health reporting
  • A complete, production-ready FastAPI health endpoint implementation
  • How startup probes solve the slow-start problem for ML services
  • SLOs, error budgets, and burn rate alerting
  • Health check anti-patterns that silently drop traffic

Prerequisites

RequirementDetails
Python 3.11+asyncio.gather, asyncio.timeout used
FastAPI + asyncpg + redis-pyHealth check targets
Kubernetes basicsProbe configuration concepts
py-healthcheckpip install py-healthcheck
Lessons 01–04 completeFull observability stack assumed

The Incident: "Ready" Pod Dropping 40% of Requests

kubectl get pods -n production:

NAME READY STATUS RESTARTS AGE
document-api-7d9b4f-xk2mn 1/1 Running 0 23m
document-api-7d9b4f-p8q7r 1/1 Running 0 23m
document-api-7d9b4f-t3n9s 1/1 Running 0 23m

All three pods are Running and 1/1 (Ready). Users are getting errors on 40% of requests - the kind that look like server errors but return quickly. Your support queue is growing.

The health check that Kubernetes is trusting:

@app.get("/health")
async def health():
return {"status": "ok"}

This endpoint always returns 200 OK, regardless of whether the application can actually handle requests. It does not check the database. It does not check Redis. It does not check the connection pool. It does not check anything.

What is actually happening: the database connection pool on all three pods is exhausted. New requests get a connection pool error within 30ms (hence the fast error response that doesn't look like a timeout). Kubernetes sees all health checks passing and keeps routing traffic to the broken pods.

The fix: a readiness check that verifies the database connection pool before declaring the pod ready to receive traffic. If the pool is exhausted, return 503. Kubernetes removes the pod from the load balancer rotation. Traffic stops hitting the broken pod. Alerts fire. Engineers investigate.

This lesson is about building health checks that Kubernetes can actually trust.

1. Liveness vs Readiness vs Startup Probes

Kubernetes uses three types of probes to manage pod lifecycle. Getting these wrong has severe consequences.

Probe Types

ProbeQuestionFailure ActionConfigured By
LivenessIs the application alive and not stuck?Kill and restart the podlivenessProbe
ReadinessIs the application ready to accept traffic?Remove from load balancer (do not restart)readinessProbe
StartupHas the application finished starting up?Kill and restart if not ready within startupProbe deadlinestartupProbe

The Critical Distinction

Liveness failure → pod restart. A restart terminates all in-flight requests, drops all in-memory state, and adds to RESTARTS count. Do not trigger restarts unless the application is genuinely stuck.

Readiness failure → traffic stop. The pod keeps running but receives no new requests from the load balancer. In-flight requests complete. The pod is not restarted. Traffic resumes when readiness recovers.

Kubernetes Configuration

# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: document-api
spec:
replicas: 3
template:
spec:
containers:
- name: document-api
image: document-api:2.14.0
ports:
- containerPort: 8001

# Startup probe: runs first, before liveness or readiness
# Gives the app time to start without triggering liveness failures
startupProbe:
httpGet:
path: /startup
port: 8001
initialDelaySeconds: 5
periodSeconds: 5
failureThreshold: 24 # Allow up to 2 minutes to start (24 * 5s)
successThreshold: 1
timeoutSeconds: 5

# Liveness probe: only checks if the process is alive
# Runs after startup probe succeeds
livenessProbe:
httpGet:
path: /liveness
port: 8001
initialDelaySeconds: 0
periodSeconds: 10
failureThreshold: 3 # 3 consecutive failures → restart
successThreshold: 1
timeoutSeconds: 5

# Readiness probe: checks if the pod can serve traffic
# Runs concurrently with liveness after startup succeeds
readinessProbe:
httpGet:
path: /readiness
port: 8001
initialDelaySeconds: 0
periodSeconds: 10
failureThreshold: 3
successThreshold: 2 # Require 2 successes to become ready again
timeoutSeconds: 10 # Longer timeout - dependency checks take time

2. Designing Liveness

The liveness probe answers: "Is this process alive and not deadlocked?"

Do check in liveness:

  • Can the asyncio event loop schedule a coroutine? (Deadlock detection)
  • Is the process responding at all?

Do NOT check in liveness:

  • Database connectivity
  • Redis availability
  • External API availability
  • Disk space
  • Anything that can fail transiently

If you check the database in your liveness probe and the database is temporarily unavailable (network blip, maintenance), Kubernetes will restart all your pods simultaneously. This is called a cascading restart and it makes the situation dramatically worse.

Liveness Endpoint

# app/api/routes/health.py
import asyncio
import time
from fastapi import APIRouter, Response

router = APIRouter()

# Track the last time the event loop was confirmed responsive
_last_event_loop_check = time.monotonic()
_EVENT_LOOP_STALENESS_THRESHOLD = 30.0 # seconds


async def _background_event_loop_heartbeat():
"""
Runs every second as a background task.
If the event loop is blocked, this task will not run,
and _last_event_loop_check will become stale.
"""
global _last_event_loop_check
while True:
_last_event_loop_check = time.monotonic()
await asyncio.sleep(1.0)


@router.get("/liveness")
async def liveness(response: Response):
"""
Liveness probe: checks only that the asyncio event loop is responsive.

Returns 200 if alive, 503 if the event loop appears blocked.
Never checks external dependencies - those belong in /readiness.
"""
now = time.monotonic()
staleness = now - _last_event_loop_check

if staleness > _EVENT_LOOP_STALENESS_THRESHOLD:
response.status_code = 503
return {
"status": "unhealthy",
"reason": "event_loop_blocked",
"staleness_seconds": round(staleness, 1),
"threshold_seconds": _EVENT_LOOP_STALENESS_THRESHOLD,
}

return {
"status": "alive",
"event_loop_staleness_seconds": round(staleness, 3),
}

Starting the Background Heartbeat

# app/main.py
from contextlib import asynccontextmanager
import asyncio
from fastapi import FastAPI
from app.api.routes.health import router as health_router, _background_event_loop_heartbeat

@asynccontextmanager
async def lifespan(app: FastAPI):
# Start the event loop heartbeat as a background task
heartbeat_task = asyncio.create_task(_background_event_loop_heartbeat())

yield

# Cancel the heartbeat on shutdown
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass

app = FastAPI(lifespan=lifespan)
app.include_router(health_router)

Why Checking DB in Liveness Causes Cascading Restarts

Timeline:
T=0: Database goes down for maintenance (5 minutes)
T=10s: Liveness probe checks DB → fails on all pods
T=30s: 3 consecutive failures → Kubernetes restarts all pods
T=30s: Pods restart, try to connect to DB, fail, crash on startup
T=60s: Kubernetes restarts again with exponential backoff
T=5min: DB comes back online
T=5min: Pods still stuck in CrashLoopBackOff
T=8min: Backoff expires, pods restart and recover

With liveness checking only the event loop:

T=0: Database goes down
T=10s: Readiness probe fails → pods removed from load balancer
T=10s: Traffic stops (returns 503 from load balancer)
T=5min: DB comes back online
T=10min: Readiness probe passes → pods added back to load balancer
T=10min: Traffic resumes normally - NO RESTARTS, NO CASCADING FAILURE

3. Designing Readiness

The readiness probe answers: "Can this pod handle a request right now?"

Do check in readiness:

  • Primary database: can we get a connection from the pool?
  • Cache (Redis): can we ping it?
  • Critical outbound APIs: can we reach them?
  • Internal state: is the ML model loaded?
  • Custom business logic: are migrations complete?

Design principles:

  • Fail fast: run dependency checks in parallel, not sequentially
  • Timeout every check: a hanging check should not block the probe for 30 seconds
  • Cache recent success: do not hammer the DB on every probe (every 10 seconds at scale = 3 probes/second per pod)
  • Be specific in failure messages: return which dependency failed, not just "unhealthy"

Complete Readiness Endpoint

# app/api/routes/health.py (continued)
import asyncio
import time
from dataclasses import dataclass
from typing import Optional, Callable, Awaitable
import structlog

log = structlog.get_logger()

@dataclass
class DependencyStatus:
name: str
healthy: bool
latency_ms: float
error: Optional[str] = None
details: Optional[dict] = None


async def _check_with_timeout(
name: str,
check_fn: Callable[[], Awaitable[dict]],
timeout_seconds: float = 3.0,
) -> DependencyStatus:
"""
Run a health check coroutine with a timeout.
Returns a DependencyStatus with healthy=False on timeout or exception.
"""
start = time.perf_counter()
try:
async with asyncio.timeout(timeout_seconds):
details = await check_fn()
latency_ms = (time.perf_counter() - start) * 1000
return DependencyStatus(
name=name,
healthy=True,
latency_ms=round(latency_ms, 2),
details=details,
)
except TimeoutError:
latency_ms = (time.perf_counter() - start) * 1000
return DependencyStatus(
name=name,
healthy=False,
latency_ms=round(latency_ms, 2),
error=f"Timed out after {timeout_seconds}s",
)
except Exception as exc:
latency_ms = (time.perf_counter() - start) * 1000
return DependencyStatus(
name=name,
healthy=False,
latency_ms=round(latency_ms, 2),
error=str(exc),
)


# ── Dependency Check Functions ─────────────────────────────────────────────

async def _check_database(db_pool) -> dict:
"""Verify we can acquire a connection and run a trivial query."""
async with db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
pool_status = {
"size": db_pool.get_size(),
"free_size": db_pool.get_idle_size(),
"min_size": db_pool.get_min_size(),
"max_size": db_pool.get_max_size(),
}
utilisation = (
(pool_status["size"] - pool_status["free_size"]) / pool_status["size"]
if pool_status["size"] > 0 else 0
)
pool_status["utilisation_pct"] = round(utilisation * 100, 1)
return pool_status


async def _check_redis(redis_client) -> dict:
"""Verify Redis is responsive with a PING."""
response = await redis_client.ping()
info = await redis_client.info("memory")
return {
"ping": response,
"used_memory_human": info.get("used_memory_human"),
"maxmemory_human": info.get("maxmemory_human"),
}


async def _check_disk_space(min_free_gb: float = 1.0) -> dict:
"""Verify sufficient disk space for log and temp file writes."""
import shutil
total, used, free = shutil.disk_usage("/")
free_gb = free / (1024 ** 3)
if free_gb < min_free_gb:
raise RuntimeError(
f"Low disk space: {free_gb:.1f}GB free, need {min_free_gb}GB"
)
return {
"total_gb": round(total / (1024 ** 3), 1),
"used_gb": round(used / (1024 ** 3), 1),
"free_gb": round(free_gb, 1),
}


async def _check_ml_model_loaded(classifier) -> dict:
"""Verify the ML model is loaded and can make predictions."""
# Run a trivial inference to check the model is operational
test_result = classifier.predict("health check test input")
return {
"model_name": classifier.model_name,
"model_version": classifier.model_version,
"loaded": True,
"test_inference_category": test_result.get("category"),
}


# ── Readiness Cache ────────────────────────────────────────────────────────

_readiness_cache: Optional[dict] = None
_readiness_cache_time: float = 0.0
_READINESS_CACHE_TTL = 5.0 # Cache readiness result for 5 seconds


@router.get("/readiness")
async def readiness(response: Response):
"""
Readiness probe: checks all critical dependencies in parallel.

Returns 200 if all dependencies are healthy, 503 otherwise.
Results are cached for 5 seconds to avoid hammering dependencies.
"""
global _readiness_cache, _readiness_cache_time

now = time.monotonic()
if _readiness_cache and (now - _readiness_cache_time) < _READINESS_CACHE_TTL:
if _readiness_cache["status"] != "ready":
response.status_code = 503
return _readiness_cache

# Import dependencies - in real code, use dependency injection
from app.database import db_pool
from app.cache import redis_client
from app.services.classifier import classifier

# Run all checks in parallel with individual timeouts
results = await asyncio.gather(
_check_with_timeout("database", lambda: _check_database(db_pool), timeout_seconds=3.0),
_check_with_timeout("redis", lambda: _check_redis(redis_client), timeout_seconds=2.0),
_check_with_timeout("disk_space", _check_disk_space, timeout_seconds=1.0),
_check_with_timeout("ml_model", lambda: _check_ml_model_loaded(classifier), timeout_seconds=5.0),
return_exceptions=False,
)

all_healthy = all(r.healthy for r in results)
unhealthy = [r for r in results if not r.healthy]

status_body = {
"status": "ready" if all_healthy else "not_ready",
"checks": {
r.name: {
"healthy": r.healthy,
"latency_ms": r.latency_ms,
**({"error": r.error} if r.error else {}),
**({"details": r.details} if r.details else {}),
}
for r in results
},
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}

if not all_healthy:
log.warning(
"readiness.check.failed",
unhealthy_deps=[r.name for r in unhealthy],
errors={r.name: r.error for r in unhealthy},
)

_readiness_cache = status_body
_readiness_cache_time = now

if not all_healthy:
response.status_code = 503

return status_body

Example Readiness Response

When all dependencies are healthy:

{
"status": "ready",
"checks": {
"database": {
"healthy": true,
"latency_ms": 4.2,
"details": {
"size": 10,
"free_size": 6,
"utilisation_pct": 40.0
}
},
"redis": {
"healthy": true,
"latency_ms": 0.8,
"details": {
"ping": true,
"used_memory_human": "2.41M",
"maxmemory_human": "256.00M"
}
},
"disk_space": {
"healthy": true,
"latency_ms": 0.2,
"details": {"total_gb": 100.0, "used_gb": 23.4, "free_gb": 76.6}
},
"ml_model": {
"healthy": true,
"latency_ms": 12.1,
"details": {
"model_name": "text-classifier",
"model_version": "1.4.2",
"loaded": true
}
}
},
"timestamp": "2026-03-07T09:14:32Z"
}

When the database connection pool is exhausted:

{
"status": "not_ready",
"checks": {
"database": {
"healthy": false,
"latency_ms": 3001.0,
"error": "Timed out after 3.0s"
},
"redis": {"healthy": true, "latency_ms": 0.9},
"disk_space": {"healthy": true, "latency_ms": 0.1},
"ml_model": {"healthy": true, "latency_ms": 11.8}
},
"timestamp": "2026-03-07T09:14:32Z"
}

HTTP 503. Kubernetes removes the pod from the load balancer.

4. py-healthcheck Library

py-healthcheck provides a structured way to register checks and expose them as HTTP endpoints. It works with Flask and other WSGI frameworks, and can be adapted for FastAPI.

# pip install py-healthcheck
from healthcheck import HealthCheck

health = HealthCheck()

def check_database():
"""Returns (is_healthy, message)."""
try:
db.execute("SELECT 1")
return True, "Database connection OK"
except Exception as e:
return False, f"Database error: {e}"

def check_redis():
try:
redis_client.ping()
return True, "Redis OK"
except Exception as e:
return False, f"Redis error: {e}"

def check_memory():
"""Fail if using > 90% of available memory."""
import psutil
mem = psutil.virtual_memory()
if mem.percent > 90:
return False, f"Memory usage too high: {mem.percent:.1f}%"
return True, f"Memory OK: {mem.percent:.1f}% used"

health.add_check(check_database)
health.add_check(check_redis)
health.add_check(check_memory)

# Flask integration (for reference):
# app.add_url_rule("/healthcheck", "healthcheck", view_func=health.run)

# FastAPI adapter:
from fastapi import FastAPI, Response

app = FastAPI()

@app.get("/healthcheck")
def healthcheck(response: Response):
message, status_code, headers = health.run()
response.status_code = status_code
return message

For production FastAPI services, the custom async implementation in Section 3 is preferable because it runs checks in parallel and has fine-grained timeout control. Use py-healthcheck for quick setups or when you need its EnvironmentDump feature:

from healthcheck import EnvironmentDump

envdump = EnvironmentDump()

def get_app_config():
"""Return non-sensitive configuration for debugging."""
return {
"version": "2.14.0",
"db_pool_size": 10,
"cache_ttl_seconds": 300,
}

envdump.add_section("application", get_app_config)

@app.get("/environment")
def environment(response: Response):
"""
Expose non-sensitive configuration for operational debugging.
Restrict access to internal networks - never expose publicly.
"""
message, status_code, headers = envdump.run()
response.status_code = status_code
return message

5. Startup Probe

Startup probes solve the slow-start problem. A service that takes 60 seconds to load an ML model will fail liveness checks (which might start after 30 seconds) and get killed before it is ready.

The startup probe runs first. While it is running, liveness and readiness probes do not run. When the startup probe succeeds, liveness and readiness probes begin.

Startup Probe for an ML Service

# app/api/routes/health.py (continued)

import asyncio
import time
from typing import Optional

# Set by the model loading code when initialisation is complete
_startup_complete: bool = False
_startup_error: Optional[str] = None
_startup_start_time: float = time.monotonic()


def mark_startup_complete() -> None:
"""Call this from the lifespan function after all initialisation is done."""
global _startup_complete
_startup_complete = True
duration = time.monotonic() - _startup_start_time
log.info("startup.complete", duration_seconds=round(duration, 2))


def mark_startup_failed(error: str) -> None:
"""Call this if startup fails - the startup probe will fail and Kubernetes will restart."""
global _startup_error
_startup_error = error
log.error("startup.failed", error=error)


@router.get("/startup")
async def startup_probe(response: Response):
"""
Startup probe: returns 200 only when all initialisation is complete.

Kubernetes runs this instead of liveness/readiness until it succeeds.
Configure failureThreshold * periodSeconds > worst-case startup time.
"""
if _startup_error:
response.status_code = 503
return {
"status": "failed",
"error": _startup_error,
"elapsed_seconds": round(time.monotonic() - _startup_start_time, 1),
}

if not _startup_complete:
response.status_code = 503
return {
"status": "starting",
"elapsed_seconds": round(time.monotonic() - _startup_start_time, 1),
"message": "Initialisation in progress",
}

return {
"status": "started",
"elapsed_seconds": round(time.monotonic() - _startup_start_time, 1),
}

Integration with Lifespan

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog

log = structlog.get_logger()

@asynccontextmanager
async def lifespan(app: FastAPI):
from app.api.routes.health import mark_startup_complete, mark_startup_failed

try:
log.info("startup.db_pool.initialising")
await initialise_db_pool()

log.info("startup.ml_model.loading")
await load_ml_model() # May take 30–60 seconds

log.info("startup.cache.warming")
await warm_cache()

log.info("startup.migrations.checking")
await verify_migrations()

# All initialisation complete - startup probe will now return 200
mark_startup_complete()

except Exception as exc:
mark_startup_failed(str(exc))
# Do not raise - let the startup probe fail so Kubernetes restarts us
# Raising here would cause the process to exit, which also works but
# does not give you a clean startup probe failure message
log.critical("startup.failed", error=str(exc), exc_info=True)
raise

yield

# Graceful shutdown
log.info("shutdown.starting")
await shutdown_db_pool()
log.info("shutdown.complete")

6. SLOs and Error Budgets

Service Level Objectives (SLOs) define what "good enough" means for your service. They are the contract between engineering and the business.

The SLO Hierarchy

SLI (Service Level Indicator)
- A specific metric that measures service health
- Example: "The fraction of HTTP requests that return 2xx in < 1s"

SLO (Service Level Objective)
- A target value for an SLI, over a time window
- Example: "99.9% of requests succeed within 1s, measured over 30 days"

Error Budget
- The amount of failures allowed before the SLO is violated
- Example: 30 days × 24 hours × 60 min × 0.1% = 43.2 minutes of downtime
- OR: 1,000,000 requests × 0.1% = 1,000 failing requests

SLA (Service Level Agreement)
- A formal contract with financial consequences for SLO violations
- SLO ≥ SLA target (your SLO is your internal goal; SLA is the customer commitment)

Calculating an Error Budget

# tools/error_budget.py
"""
Calculate error budget consumption for a given SLO.
"""
from dataclasses import dataclass
from datetime import timedelta

@dataclass
class ErrorBudget:
slo_percentage: float # e.g., 99.9
window_days: int # e.g., 30

@property
def error_rate_allowed(self) -> float:
"""Fraction of requests allowed to fail."""
return 1.0 - (self.slo_percentage / 100.0)

@property
def allowed_downtime(self) -> timedelta:
"""Total allowed downtime in the window."""
total_minutes = self.window_days * 24 * 60
allowed_minutes = total_minutes * self.error_rate_allowed
return timedelta(minutes=allowed_minutes)

def budget_remaining(self, failed_requests: int, total_requests: int) -> float:
"""
Calculate remaining error budget as a percentage.
0% = SLO violated, 100% = no errors.
"""
actual_error_rate = failed_requests / max(total_requests, 1)
consumed = actual_error_rate / self.error_rate_allowed
return max(0.0, (1.0 - consumed) * 100.0)

def burn_rate(self, failed_requests: int, total_requests: int) -> float:
"""
Error budget burn rate.
1.0 = burning at exactly the SLO rate (budget will be zero at end of window)
2.0 = burning twice as fast (will exhaust budget halfway through window)
"""
actual_error_rate = failed_requests / max(total_requests, 1)
return actual_error_rate / self.error_rate_allowed


# Example:
budget = ErrorBudget(slo_percentage=99.9, window_days=30)
print(f"Allowed downtime: {budget.allowed_downtime}")
# Allowed downtime: 0:43:12

print(f"Error rate allowed: {budget.error_rate_allowed:.4%}")
# Error rate allowed: 0.1000%

# If we've had 500 failures out of 1,000,000 requests:
print(f"Budget remaining: {budget.budget_remaining(500, 1_000_000):.1f}%")
# Budget remaining: 50.0%

print(f"Burn rate: {budget.burn_rate(500, 1_000_000):.1f}x")
# Burn rate: 0.5x (we're using only half our budget)

PromQL SLO Queries

# SLI: fraction of requests that succeed
sum(rate(http_requests_total{status=~"2.."}[30d]))
/
sum(rate(http_requests_total[30d]))

# Error budget remaining (as a percentage)
(
1 -
(
sum(rate(http_requests_total{status=~"5.."}[30d]))
/
sum(rate(http_requests_total[30d]))
)
/
0.001 # 1 - 0.999 = 0.001 for a 99.9% SLO
) * 100

# Burn rate over the last hour
# If > 14.4x, the error budget will be exhausted in < 5 hours
(
sum(rate(http_requests_total{status=~"5.."}[1h]))
/
sum(rate(http_requests_total[1h]))
)
/
0.001 # error rate budget

Burn Rate Alerting (Google SRE Book Method)

# config/alerts.yml (additions)
groups:
- name: slo_alerts
rules:
# Fast burn: high urgency (page now)
# Burns 14.4x faster than allowed → exhausts 5% budget in 1 hour
- alert: ErrorBudgetBurnRateHigh
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[1h]))
/
sum(rate(http_requests_total[1h]))
) / 0.001 > 14.4
for: 5m
labels:
severity: critical
page: "true"
annotations:
summary: "Error budget burning at {{ $value | printf \"%.1f\" }}x - exhaustion in < 5h"

# Medium burn: investigate in the next few hours
- alert: ErrorBudgetBurnRateMedium
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[6h]))
/
sum(rate(http_requests_total[6h]))
) / 0.001 > 6
for: 15m
labels:
severity: warning
annotations:
summary: "Error budget burning at {{ $value | printf \"%.1f\" }}x"

# Slow burn: investigate before end of week
- alert: ErrorBudgetBurnRateSlow
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[24h]))
/
sum(rate(http_requests_total[24h]))
) / 0.001 > 3
for: 1h
labels:
severity: info
annotations:
summary: "Error budget at risk - burning at {{ $value | printf \"%.1f\" }}x"

7. Synthetic Monitoring

Synthetic monitoring tests real user flows from the outside, without relying on internal metrics. It detects the difference between "the service is up" and "the service is working correctly."

Simple Internal Synthetic Check

# app/services/synthetic_monitor.py
"""
Synthetic health check that exercises a real user flow.
Run this as a scheduled background task (every 60 seconds).
"""
import asyncio
import time
import httpx
import structlog
from prometheus_client import Gauge, Counter, Histogram

log = structlog.get_logger()

synthetic_check_success = Counter(
"synthetic_check_success_total",
"Synthetic monitoring checks that passed",
["check_name"],
)
synthetic_check_failure = Counter(
"synthetic_check_failure_total",
"Synthetic monitoring checks that failed",
["check_name", "failure_reason"],
)
synthetic_check_duration_seconds = Histogram(
"synthetic_check_duration_seconds",
"Duration of synthetic monitoring checks",
["check_name"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, float("inf")],
)
synthetic_check_last_success = Gauge(
"synthetic_check_last_success_timestamp",
"Unix timestamp of the last successful synthetic check",
["check_name"],
)


async def run_document_upload_check(base_url: str) -> None:
"""
Synthetic check: upload a test document and verify classification.

This exercises: authentication, file upload, ML inference, DB write.
"""
check_name = "document_upload_and_classify"
start = time.perf_counter()

try:
async with httpx.AsyncClient(base_url=base_url, timeout=15.0) as client:
# Step 1: Upload a known test document
test_content = b"This is a synthetic monitoring test document about technology."
upload_response = await client.post(
"/api/documents",
files={"file": ("test.txt", test_content, "text/plain")},
headers={"Authorization": f"Bearer {get_synthetic_test_token()}"},
)

if upload_response.status_code != 200:
raise RuntimeError(
f"Upload returned {upload_response.status_code}: {upload_response.text[:200]}"
)

doc_id = upload_response.json()["id"]

# Step 2: Verify the document was classified correctly
classify_response = await client.get(
f"/api/documents/{doc_id}",
headers={"Authorization": f"Bearer {get_synthetic_test_token()}"},
)

if classify_response.status_code != 200:
raise RuntimeError(
f"Fetch returned {classify_response.status_code}"
)

doc = classify_response.json()
if doc.get("category") != "technology":
raise RuntimeError(
f"Expected category 'technology', got '{doc.get('category')}'"
)

# Step 3: Clean up the test document
await client.delete(
f"/api/documents/{doc_id}",
headers={"Authorization": f"Bearer {get_synthetic_test_token()}"},
)

duration = time.perf_counter() - start
synthetic_check_success.labels(check_name=check_name).inc()
synthetic_check_last_success.labels(check_name=check_name).set_to_current_time()
synthetic_check_duration_seconds.labels(check_name=check_name).observe(duration)

log.info(
"synthetic.check.passed",
check_name=check_name,
duration_ms=round(duration * 1000, 1),
)

except Exception as exc:
duration = time.perf_counter() - start
failure_reason = type(exc).__name__
synthetic_check_failure.labels(
check_name=check_name,
failure_reason=failure_reason,
).inc()
synthetic_check_duration_seconds.labels(check_name=check_name).observe(duration)

log.error(
"synthetic.check.failed",
check_name=check_name,
error=str(exc),
duration_ms=round(duration * 1000, 1),
)


async def run_synthetic_monitoring_loop(base_url: str, interval_seconds: float = 60.0):
"""Background task that runs synthetic checks periodically."""
while True:
await asyncio.gather(
run_document_upload_check(base_url),
return_exceptions=True, # Don't crash the loop on check failures
)
await asyncio.sleep(interval_seconds)

Synthetic Monitoring Alert

# Add to config/alerts.yml
- alert: SyntheticCheckFailing
expr: |
(time() - synthetic_check_last_success_timestamp) > 300
for: 0m # Alert immediately
labels:
severity: critical
page: "true"
annotations:
summary: "Synthetic check '{{ $labels.check_name }}' has not passed in > 5 minutes"
description: >
The synthetic end-to-end health check is failing.
This means real user flows are broken even if all dependency checks pass.

8. Health Check Anti-Patterns

These patterns appear in production services frequently. Each one is a reliability failure waiting to happen.

Anti-Pattern 1: Always Returning 200

# WRONG - the incident that opened this lesson
@app.get("/health")
async def health():
return {"status": "ok"} # Never fails. Useless.

Anti-Pattern 2: Checking the Wrong Dependencies

# WRONG - checking a non-critical system in readiness
@app.get("/readiness")
async def readiness():
# This service's primary function is document processing.
# The analytics warehouse is NOT needed for core functionality.
try:
await analytics_warehouse.ping()
except Exception:
return Response(status_code=503) # Pod removed from LB for a non-critical dep!
return {"status": "ready"}

Only check dependencies that are critical for serving the request. Non-critical dependencies that degrade functionality (analytics, notifications, audit logging) should be logged as warnings but should not fail readiness.

Anti-Pattern 3: Too Slow

# WRONG - sequential checks with no timeout
@app.get("/readiness")
async def readiness():
await check_database() # 2s if slow
await check_redis() # 2s if slow
await check_s3() # 2s if slow
await check_external_api() # 2s if slow
return {"status": "ready"}
# Total: up to 8 seconds. Kubernetes probe timeout: 10 seconds.
# If Kubernetes probes every 10 seconds, the pod is nearly always
# running a health check - consuming real resources.

Fix: use asyncio.gather + asyncio.timeout as shown in Section 3.

Anti-Pattern 4: Leaking Sensitive Information

# WRONG - exposing internal information publicly
@app.get("/health")
async def health():
return {
"status": "ok",
"database_url": settings.DATABASE_URL, # Leaks credentials!
"redis_host": settings.REDIS_HOST, # Leaks topology!
"api_keys": {
"openai": settings.OPENAI_API_KEY, # Leaks secrets!
},
}

Health endpoints are often publicly accessible (load balancer health checks do not authenticate). Never expose credentials, internal hostnames, or configuration that would help an attacker.

Anti-Pattern 5: Checking Liveness in Readiness

# WRONG - using the same check for both probes
@app.get("/liveness")
@app.get("/readiness")
async def health():
await check_database() # This is the readiness check
return {"status": "ok"}

If you use the same endpoint for both probes, a database failure triggers liveness failures, which causes pod restarts, which causes cascading failures. Use separate endpoints.

Anti-Pattern 6: Not Testing Health Checks

# Write these tests - they save incidents

import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch

def test_readiness_returns_503_when_db_is_down(client: TestClient):
with patch("app.database.db_pool") as mock_pool:
mock_pool.acquire.side_effect = Exception("Connection refused")
response = client.get("/readiness")
assert response.status_code == 503
assert response.json()["checks"]["database"]["healthy"] is False

def test_liveness_returns_200_when_db_is_down(client: TestClient):
"""Liveness should still return 200 even when the DB is down."""
with patch("app.database.db_pool") as mock_pool:
mock_pool.acquire.side_effect = Exception("Connection refused")
response = client.get("/liveness")
assert response.status_code == 200 # Liveness does NOT check DB

Summary Anti-Patterns Table

Anti-PatternConsequenceFix
Always return 200Broken pods receive trafficCheck real dependencies
Check DB in livenessCascading restarts on DB unavailabilityOnly check event loop in liveness
Sequential dependency checksHealth check slower than probe timeoutasyncio.gather for parallel checks
No per-check timeoutOne hung dependency blocks all checksasyncio.timeout per check
Check non-critical deps in readinessPod removed from LB for irrelevant failuresOnly check critical deps
Expose secrets in health responseCredential leak to anyone who can hit /healthReturn status only, no config
No health check tests"Fixed" the health check and broke itTest 200 and 503 paths
Same endpoint for liveness and readinessDB failure causes pod restartsSeparate endpoints, separate logic

Interview Questions and Answers

Q1: Your Kubernetes deployment has three replicas. The database goes down for 3 minutes. Describe what happens to your service if readiness checks the database, vs if liveness checks the database.

If readiness checks the database: All three pods fail their readiness check. Kubernetes removes all three pods from the load balancer. Traffic starts returning 503 from the load balancer. The pods remain running. After 3 minutes, the database recovers. Readiness checks pass. Pods are added back to the load balancer. Service recovers fully, no restarts, no data loss. Total user impact: 3 minutes of 503s, but all state preserved.

If liveness checks the database: All three pods fail their liveness check. After failureThreshold failures (e.g., 3 × 10s = 30 seconds), Kubernetes kills all three pods and restarts them. The restarting pods try to connect to the still-down database. They fail again. Kubernetes applies exponential backoff (10s, 20s, 40s...). After 3 minutes, the database comes back, but the pods are in CrashLoopBackOff with long backoff timers. The service might not fully recover for 10–15 additional minutes. This is a cascading failure caused by a health check design error.

Q2: A readiness check for your service calls an external payment provider's API. The payment provider has a 60-second outage. What happens to your service's pods, and is this the right behaviour?

All pods fail readiness and are removed from the load balancer. Customers cannot use any feature of your application, including ones that do not involve payments. This is almost certainly wrong. The payment dependency should only fail readiness if your service literally cannot function without it - for a payment service, that makes sense. For an application where payments are one feature among many, you should remove the payment check from readiness entirely, add monitoring to detect when the payment API is unavailable, and implement graceful degradation: disable the payment button in the UI when the API is unreachable. Readiness should reflect "can I serve any request?", not "can I serve every possible request?"

Q3: What is the error budget for a 99.95% SLO over 30 days, and how does a 10x burn rate affect your timeline for exhausting it?

Error rate allowed = 1 - 0.9995 = 0.0005 (0.05%). Over 30 days = 43,200 minutes. Error budget in minutes = 43,200 × 0.0005 = 21.6 minutes. At a 10x burn rate (actual error rate is 10 × 0.05% = 0.5%), you exhaust the budget in 30 days / 10 = 3 days. Google's SRE book recommends paging immediately when the burn rate exceeds 14.4x (exhaustion in < 5 hours), and sending a warning when it exceeds 6x (exhaustion in < 5 days within the window).

Q4: Your ML service loads a 2GB model at startup, which takes 90 seconds. The Kubernetes liveness probe has initialDelaySeconds: 30 and failureThreshold: 3. What happens, and how do you fix it?

After 30 seconds, the liveness probe starts. The service is not yet ready (still loading the model). After 3 failures (30 + 3×10 = 60 seconds total), Kubernetes kills and restarts the pod. The pod is in an infinite restart loop because it always takes 90 seconds to start but is killed after 60. The fix is a startup probe with a deadline longer than the worst-case startup time:

startupProbe:
httpGet:
path: /startup
port: 8001
periodSeconds: 5
failureThreshold: 30 # 30 × 5s = 150 seconds maximum startup time

While the startup probe is running, liveness and readiness probes do not run. Once the startup probe succeeds (at ~90 seconds), liveness and readiness probes begin normally.

Q5: How do you implement a health check cache without introducing a race condition in an asyncio application?

An asyncio application is single-threaded within the event loop, so a simple module-level variable with a timestamp is safe from race conditions - only one coroutine runs at a time. The implementation in this lesson (_readiness_cache and _readiness_cache_time) is race-condition-free in pure asyncio. However, if you run multiple uvicorn workers with --workers 4, each worker is a separate process with its own memory. Each process maintains its own cache independently - this is fine, it just means each worker will check dependencies every 5 seconds independently rather than sharing a cache. For a multi-process setup, if you need a shared cache to further reduce dependency load, use Redis as a shared cache store with a TTL, but this adds Redis as a dependency of the health check that checks Redis - be careful not to create circular dependencies.

© 2026 EngineersOfAI. All rights reserved.